package org.example.partition;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.example.aggres.Dog;

/**
 * rich function
 */
public class DataPartition {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Dog> source = env.fromElements(
                new Dog("aa", "k", 1), new Dog("aa", "k", 1), new Dog("aa", "k", 1), new Dog("aa", "k", 1), new Dog("aa", "k", 1),
                new Dog("bb", "b", 5), new Dog("bb", "b", 5), new Dog("bb", "b", 5),
                new Dog("cc", "b", 6), new Dog("cc", "b", 6),
                new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8), new Dog("dd", "u", 8),
                new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4), new Dog("ee", "u", 4),
                new Dog("ff", "b", 4), new Dog("ff", "b", 4), new Dog("ff", "b", 4), new Dog("ff", "b", 4), new Dog("ff", "b", 4), new Dog("ff", "b", 4)

        );
        env.setParallelism(1);
//        均匀分区，避免数据倾斜
//        source.shuffle().print().setParallelism(5);
//        轮询分区，默认分区
//        source.rebalance().print().setParallelism(4);
//        重分配分区
//        env.addSource(new RichParallelSourceFunction<Integer>() {
//            @Override
//            public void run(SourceContext<Integer> ctx) throws Exception {
//                for (int i = 1; i <= 8; i++) {
//                    int i1 = getRuntimeContext().getIndexOfThisSubtask() % 2;
//                    System.out.println(i1);
//                    if (i % 2 == i1) {
//                        ctx.collect(i);
//                    }
//                }
//            }
//
//            @Override
//            public void cancel() {
//
//            }
//        }).setParallelism(2).rescale().print().setParallelism(4);

//        广播分区，所有并行度线程都会处理
//        source.broadcast().print().setParallelism(4);

//        全局分区, 所有并行度线程都会合并到一个线程处理
//        source.global().print().setParallelism(4);

//        自定义分区
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                }).print().setParallelism(4);

        env.execute();
    }
}